package com.biquge.common;

import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;


public class ThreadPoolUtils {

    public static final String ASYNC_TREAD_NAME = "Async Thread";

    /**
     * CPU 核数
     */
    public static final int AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors();

    public static final int DEFAULT_SIZE = 8;

    /**
     * 核心线程数
     */
    private static final int CORE_SIZE = Math.max(AVAILABLE_PROCESSORS, DEFAULT_SIZE);

    /**
     * 最大线程数（核心线程全部在执行任务，且任务达到最大容量，才开始创建线程执行任务）
     */
    public static final int MAX_SIZE = CORE_SIZE << 1;

    /**
     * 空闲线程存活时间
     */
    private static final int KEEP_ALIVE_TIME = 20;

    /**
     * 任务队列容量
     */
    private static final int QUEUE_CAPACITY = 200;

    /**
     * 线程池
     */
    private static volatile ThreadPoolExecutor executor;

    private ThreadPoolUtils(){}


    /**
     * 获取线程池
     *
     * @return 线程池
     */
    private static ThreadPoolExecutor getExecutor() {
        if (null == executor) {
            synchronized (ThreadPoolUtils.class) {
                if (null == executor) {
                    executor = new ThreadPoolExecutor(
                            CORE_SIZE, // 核心线程数
                            MAX_SIZE, // 最大线程数
                            KEEP_ALIVE_TIME, // 线程存活时间
                            TimeUnit.MINUTES, // 时间单位
                            new LinkedBlockingDeque<>(QUEUE_CAPACITY), // 线程阻塞队列
                            Executors.defaultThreadFactory(), // 创建线程工厂
                            new ThreadPoolExecutor.CallerRunsPolicy());// 队列满了拒绝策略-由调用线程执行任务

                    // 系统关闭时释放线程池
                    Runtime.getRuntime().addShutdownHook(new Thread(() -> {
                        executor.shutdown(); // 如果存在没执行完的任务，则执行完在关闭；
                    }));
                } else {
                    return executor;
                }
            }
        }
        return executor;
    }

    /**
     * 创建新线程池
     *
     * @return 线程池
     */
    public static ThreadPoolTaskExecutor newThreadPoolTaskExecutor(String name) {
        if (StringUtils.isBlank(name)) {
            name = ASYNC_TREAD_NAME;
        }
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(CORE_SIZE); // 核心线程数
        taskExecutor.setMaxPoolSize(MAX_SIZE); // 最大线程数
        taskExecutor.setQueueCapacity(QUEUE_CAPACITY); // 队列容量
        taskExecutor.setThreadNamePrefix(name + "-"); // 线程名前缀
        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());// 队列满了拒绝策略-由调用线程执行任务
        return taskExecutor;
    }


    /**
     * 执行任务-无返回结果
     *
     * @param runner 可执行任务
     */
    public static void run(Runnable runner) {
        if (null == runner) {
            return;
        }
        getExecutor().execute(runner);
    }

    /**
     * 执行任务-无返回结果
     *
     * @param runners 可执行任务
     */
    public static void run(Collection<Runnable> runners) {
        if (CollectionUtils.isEmpty(runners)) {
            return;
        }
        ThreadPoolExecutor executor = getExecutor();
        runners.forEach(executor::execute);
    }

    /**
     * 执行任务-有返回值
     *
     * @param caller 可执行任务
     * @param <T>    泛型
     * @return 异步返回结果
     */
    public static <T> Future<T> submit(Callable<T> caller) {
        if (null == caller) {
            return null;
        }
        return getExecutor().submit(caller);
    }


    /**
     * 执行任务-有返回值
     *
     * @param callers 可执行任务
     * @param <T>     泛型
     * @return 异步返回结果
     */
    public static <T> List<Future<T>> submit(Collection<Callable<T>> callers) {
        if (CollectionUtils.isEmpty(callers)) {
            return new ArrayList<>();
        }
        ThreadPoolExecutor executor = getExecutor();
        return callers.stream().map(executor::submit).collect(Collectors.toList());
    }

    /**
     * 获取结果
     *
     * @param future  Future
     * @param seconds 秒
     * @param <T>     泛型
     * @return 结果
     */
    public static <T> T waitSecondAndGet(Future<T> future, Integer seconds) {
        return waitAndGet(Collections.singletonList(future), seconds, TimeUnit.SECONDS).get(0);
    }

    /**
     * 获取结果
     *
     * @param future  Future
     * @param minutes 分钟
     * @param <T>     泛型
     * @return 结果
     */
    public static <T> T waitMinuteAndGet(Future<T> future, Integer minutes) {
        return waitAndGet(Collections.singletonList(future), minutes, TimeUnit.MINUTES).get(0);
    }

    /**
     * 获取结果
     *
     * @param futures Future
     * @param seconds 秒
     * @param <T>     泛型
     * @return 结果
     */
    public static <T> Map<Integer, T> waitSecondAndGet(List<Future<T>> futures, Integer seconds) {
        return waitAndGet(futures, seconds, TimeUnit.SECONDS);
    }

    /**
     * 获取结果
     *
     * @param futures Future
     * @param minutes 分钟
     * @param <T>     泛型
     * @return 结果
     */
    public static <T> Map<Integer, T> waitMinuteAndGet(List<Future<T>> futures, Integer minutes) {
        return waitAndGet(futures, minutes, TimeUnit.MINUTES);
    }

    /**
     * 获取结果
     *
     * @param futures Future
     * @param timeout 时间
     * @param unit    时间单位
     * @param <T>     泛型
     * @return 结果
     */
    public static <T> Map<Integer, T> waitAndGet(List<Future<T>> futures, Integer timeout, TimeUnit unit) {
        Map<Integer, T> result = new HashMap<>();
        if (CollectionUtils.isEmpty(futures)) {
            return result;
        }
        for (int i = 0; i < futures.size(); i++) {
            try {
                T t = futures.get(i).get(timeout, unit);
                result.put(i, t);
            } catch (Exception e) {
                result.put(i, null);
                e.printStackTrace();
            }
        }
        return result;
    }

}
